/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

package org.apache.skywalking.oap.server.core.config.group;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.skywalking.oap.server.ai.pipeline.services.api.HttpUriPattern;
import org.apache.skywalking.oap.server.ai.pipeline.services.api.HttpUriRecognition;
import org.apache.skywalking.oap.server.core.config.group.openapi.EndpointGroupingRule4Openapi;
import org.apache.skywalking.oap.server.core.config.group.uri.quickmatch.QuickUriGroupingRule;
import org.apache.skywalking.oap.server.core.query.MetadataQueryService;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.library.util.RunnableWithExceptionProtection;
import org.apache.skywalking.oap.server.library.util.StringFormatGroup;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import io.vavr.Tuple2;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class EndpointNameGrouping {
    public static final String ABANDONED_ENDPOINT_NAME = "_abandoned";

    /**
     * Endpoint grouping according to local endpoint-name-grouping.yml or associated dynamic configuration.
     */
    @Setter
    private volatile QuickUriGroupingRule endpointGroupingRule;
    /**
     * Endpoint grouping according to OpenAPI specification. The OpenAPI spec file is expected to provide.
     */
    @Setter
    private volatile EndpointGroupingRule4Openapi endpointGroupingRule4Openapi;
    /**
     * AI pipeline based HTTP URI pattern recognition. The rules are automatically generated by remote AI/ML service.
     */
    @Setter
    private volatile QuickUriGroupingRule quickUriGroupingRule;
    /**
     * Cache the HTTP URIs which are not formatted by the rules per service.
     * Level one map key is service name, the value is a map of HTTP URIs with candidates of formatted names.
     * If the URI is formatted by the rules, the value would be the first 10 formatted names.
     * If the URI is unformatted, the value would be an empty queue.
     */
    private final Map<String/* service */, Map<String/* uri */, Queue<String>/* candidate patterns */>> cachedHttpUris = new ConcurrentHashMap<>();
    private final LoadingCache<String/* service */, Set<String>/* unformatted uris */> unformattedHttpUrisCache = 
        CacheBuilder.newBuilder().expireAfterWrite(Duration.ofMinutes(10)).build(new CacheLoader<>() {
            @Override
            public Set<String> load(String service) {
                return ConcurrentHashMap.newKeySet();
            }
        });
    private final AtomicInteger aiPipelineExecutionCounter = new AtomicInteger(0);
    /**
     * The max number of HTTP URIs per service for further URI pattern recognition.
     */
    private int maxHttpUrisNumberPerService = 3000;

    /**
     * Format the endpoint name according to the API patterns.
     *
     * @param serviceName  service name
     * @param endpointName endpoint name to be formatted.
     * @return Tuple2 The result of endpoint name formatting.
     * The first element is the formatted name or original name when not formatted.
     * The second element is a boolean represented the endpoint name is formatted or not.
     */
    public Tuple2<String, Boolean> format(String serviceName, String endpointName) {
        Tuple2<String, Boolean> formattedName = new Tuple2<>(endpointName, Boolean.FALSE);
        if (endpointGroupingRule4Openapi != null) {
            formattedName = formatByOpenapi(serviceName, endpointName);
        }

        if (!formattedName._2() && endpointGroupingRule != null) {
            formattedName = formatByCustom(serviceName, endpointName);
        }

        if (!formattedName._2() && quickUriGroupingRule != null) {
            formattedName = formatByQuickUriPattern(serviceName, endpointName);

            Map<String, Queue<String>> svrHttpUris =
                cachedHttpUris.computeIfAbsent(serviceName, k -> new ConcurrentHashMap<>());

            // Only cache first N (determined by maxHttpUrisNumberPerService) URIs per 30 mins.
            if (svrHttpUris.size() < maxHttpUrisNumberPerService) {
                if (formattedName._2()) {
                    // Algorithm side should not return a pattern that has no {var} in it else this
                    // code may accidentally retrieve the size 1 queue created by unformatted endpoint
                    // The queue size is 10, which means only cache the first 10 formatted names.
                    final Queue<String> formattedURIs = svrHttpUris.computeIfAbsent(
                        formattedName._1(), k -> new ArrayBlockingQueue<>(10));
                    // Try to push the raw URI as a candidate of formatted name.
                    formattedURIs.offer(endpointName);
                } else {
                    svrHttpUris.computeIfAbsent(endpointName, k -> new ArrayBlockingQueue<>(1));
                }
            }
        }

        // If there are too many unformatted URIs, we will abandon the unformatted URIs to reduce
        // the load of OAP and storage.
        final var unformattedUrisOfService = unformattedHttpUrisCache.getUnchecked(serviceName);
        if (!formattedName._2()) {
            if (unformattedUrisOfService.size() < maxHttpUrisNumberPerService) {
                unformattedUrisOfService.add(endpointName);
            } else {
                formattedName = new Tuple2<>(ABANDONED_ENDPOINT_NAME, true);
            }
        } else {
            unformattedUrisOfService.remove(endpointName);
        }

        return formattedName;
    }

    private Tuple2<String, Boolean> formatByCustom(String serviceName, String endpointName) {
        final StringFormatGroup.FormatResult formatResult = endpointGroupingRule.format(serviceName, endpointName);
        if (log.isDebugEnabled() || log.isTraceEnabled()) {
            if (formatResult.isMatch()) {
                log.debug("Endpoint {} of Service {} has been renamed in group {} by endpointGroupingRule",
                          endpointName, serviceName, formatResult.getName()
                );
            } else {
                log.trace("Endpoint {} of Service {} keeps unchanged.", endpointName, serviceName);
            }
        }
        return new Tuple2<>(formatResult.getReplacedName(), formatResult.isMatch());
    }

    private Tuple2<String, Boolean> formatByOpenapi(String serviceName, String endpointName) {
        final StringFormatGroup.FormatResult formatResult = endpointGroupingRule4Openapi.format(
            serviceName, endpointName);
        if (log.isDebugEnabled() || log.isTraceEnabled()) {
            if (formatResult.isMatch()) {
                log.debug("Endpoint {} of Service {} has been renamed in group {} by endpointGroupingRule4Openapi",
                          endpointName, serviceName, formatResult.getName()
                );
            } else {
                log.trace("Endpoint {} of Service {} keeps unchanged.", endpointName, serviceName);
            }
        }
        return new Tuple2<>(formatResult.getReplacedName(), formatResult.isMatch());
    }

    private Tuple2<String, Boolean> formatByQuickUriPattern(String serviceName, String endpointName) {
        final StringFormatGroup.FormatResult formatResult = quickUriGroupingRule.format(serviceName, endpointName);
        if (log.isDebugEnabled() || log.isTraceEnabled()) {
            if (formatResult.isMatch()) {
                log.debug(
                    "Endpoint {} of Service {} has been renamed in group {} by AI/ML-powered URI pattern recognition",
                    endpointName, serviceName, formatResult.getName()
                );
            } else {
                log.trace("Endpoint {} of Service {} keeps unchanged.", endpointName, serviceName);
            }
        }
        return new Tuple2<>(formatResult.getReplacedName(), formatResult.isMatch());
    }

    public void startHttpUriRecognitionSvr(final HttpUriRecognition httpUriRecognitionSvr,
                                           final MetadataQueryService metadataQueryService,
                                           int syncPeriodHttpUriRecognitionPattern,
                                           int trainingPeriodHttpUriRecognitionPattern,
                                           int maxEndpointCandidatePerSvr) {
        this.maxHttpUrisNumberPerService = maxEndpointCandidatePerSvr;
        if (!httpUriRecognitionSvr.isInitialized()) {
            return;
        }
        this.quickUriGroupingRule = new QuickUriGroupingRule();
        Executors.newSingleThreadScheduledExecutor()
                 .scheduleWithFixedDelay(
                     new RunnableWithExceptionProtection(
                         () -> {
                             int currentExecutionCounter = aiPipelineExecutionCounter.incrementAndGet();
                             if (currentExecutionCounter % trainingPeriodHttpUriRecognitionPattern == 0) {
                                 // Send the cached URIs to the recognition server to build new patterns.
                                 cachedHttpUris.forEach((serviceName, httpUris) -> {
                                     final List<HttpUriRecognition.HTTPUri> candidates4UriPatterns = new ArrayList<>(
                                         3000);
                                     httpUris.forEach((uri, candidates) -> {
                                         if (candidates.size() == 0) {
                                             //unrecognized uri
                                             candidates4UriPatterns.add(new HttpUriRecognition.HTTPUri(uri));
                                         } else {
                                             String candidateUri;
                                             while ((candidateUri = candidates.poll()) != null) {
                                                 candidates4UriPatterns.add(
                                                     new HttpUriRecognition.HTTPUri(candidateUri));
                                             }
                                         }
                                     });

                                     // Reset the cache once the URIs are sent to the recognition server.
                                     httpUris.clear();
                                     httpUriRecognitionSvr.feedRawData(serviceName, candidates4UriPatterns);
                                 });
                             }
                             if (currentExecutionCounter % syncPeriodHttpUriRecognitionPattern == 0) {
                                 // Sync with the recognition server per 1 min to get the latest patterns.
                                 try {
                                     metadataQueryService.listServices(null, null).forEach(
                                         service -> {
                                             final List<HttpUriPattern> patterns
                                                 = httpUriRecognitionSvr.fetchAllPatterns(service.getName());
                                             if (CollectionUtils.isNotEmpty(patterns)) {
                                                 patterns.forEach(
                                                     p -> quickUriGroupingRule.addRule(
                                                         service.getName(), p.getPattern()));

                                             }
                                         }
                                     );
                                 } catch (IOException e) {
                                     log.error("Fail to load all services.", e);
                                 }

                             }
                         },
                         t -> log.error("Fail to recognize URI patterns.", t)
                     ), 60, 1, TimeUnit.SECONDS
                 );

    }
}
